package com.zwh.kettle.modules.service;

import java.io.File;
import java.io.FileWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.RowMetaAndData;
import org.pentaho.di.core.exception.KettleStepException;
import org.pentaho.di.core.logging.KettleLogStore;
import org.pentaho.di.core.logging.LogLevel;
import org.pentaho.di.core.row.RowMetaInterface;
import org.pentaho.di.core.row.ValueMetaInterface;
import org.pentaho.di.core.xml.XMLHandler;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransHopMeta;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.BaseStepData;
import org.pentaho.di.trans.step.RowAdapter;
import org.pentaho.di.trans.step.StepInterface;
import org.pentaho.di.trans.step.StepMetaDataCombi;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.w3c.dom.Document;
import org.w3c.dom.Node;

import com.alibaba.fastjson.JSONObject;

import com.zwh.kettle.modules.dto.OutputField;
import com.zwh.kettle.modules.dto.StepPreviewDataDto;
import com.zwh.kettle.modules.dto.StepStatusDto;
import com.zwh.kettle.modules.dto.TransHopNodeWrap;
import com.zwh.kettle.modules.dto.TransResultDto;
import com.zwh.kettle.modules.dto.TransStatusDto;

/**
 * @author zwh
 * @date 2021/9/27 15:13
 **/
@Service
public class TransService {

    /**
     * 预览数据行数
     */
    @Value("${kettle.preview.previewDataRows}")
    private Integer previewDataRows = 5;

    /**
     * 启动转换，返回转换日志和预览结果的调试数据
     * @param kettleTransContent
     * @return
     */
    public TransResultDto startTransWithDebug(String kettleTransContent){
        try {
            String createObjectId = UUID.randomUUID().toString();
            KettleEnvironment.init();
            Document doc = XMLHandler.loadXMLString( kettleTransContent, false, false );
            Node transNode = XMLHandler.getSubNode( doc, TransMeta.XML_TAG );
            TransMeta transMeta = new TransMeta(transNode,null);

            transMeta.setName(createObjectId);
            transMeta.setCarteObjectId(createObjectId);

            Trans trans = new Trans(transMeta);
            trans.setLogLevel(LogLevel.BASIC);
            trans.prepareExecution(null);

            // 添加节点行数据监听器，用于获取节点输出的前一百行数据
            Map<String, List<RowMetaAndData>> previewDataMap = new HashMap<>(2);
            for (StepMetaDataCombi step : trans.getSteps()) {
                List<RowMetaAndData> rowsData = new ArrayList<>();
                previewDataMap.put(step.stepname, rowsData);
                step.step.addRowListener( new RowAdapter() {

                    @Override
                    public void rowWrittenEvent(RowMetaInterface rowMeta, Object[] row ) throws KettleStepException {
                        if ( rowsData.size() < previewDataRows ) {
                            try {
                                rowsData.add( new RowMetaAndData( rowMeta, rowMeta.cloneRow( row ) ) );
                            } catch ( Exception e ) {
                                throw new KettleStepException( "Unable to clone row for metadata : " + rowMeta, e );
                            }
                        }
                    }
                } );
            }

            // 启动转换，等待执行完成
            trans.startThreads();
            trans.waitUntilFinished();

            List<StepPreviewDataDto> dataList = new ArrayList<>();
            // 处理结果和日志数据
            if (previewDataMap.size() > 0) {
                for (Map.Entry<String, List<RowMetaAndData>> stringListEntry : previewDataMap.entrySet()) {
                    StepPreviewDataDto stepPreviewDataDto = new StepPreviewDataDto();
                    List<OutputField> columns = new ArrayList<>();
                    for (ValueMetaInterface valueMetaInterface : stringListEntry.getValue().get(0).getRowMeta().getValueMetaList()) {
                        OutputField outputField = new OutputField();
                        outputField.setName(valueMetaInterface.getName());
                        outputField.setType(valueMetaInterface.getTypeDesc());
                        columns.add(outputField);
                    }
                    Object[] data = stringListEntry.getValue().stream().map(RowMetaAndData::getData).toArray();
                    stepPreviewDataDto.setDataArray(data);
                    stepPreviewDataDto.setFields(columns);
                    stepPreviewDataDto.setStepName(stringListEntry.getKey());
                    dataList.add(stepPreviewDataDto);
                }
            }

            // 转换任务执行日志和各个节点状态处理保存到日志文件
            int lastLineNr = KettleLogStore.getLastBufferLineNr();
            String logData = KettleLogStore.getAppender().getBuffer(
                    trans.getLogChannel().getLogChannelId(), false, 0, lastLineNr ).toString();

            TransStatusDto transStatusDto = new TransStatusDto(trans.getName(), trans.getContainerObjectId(), trans.getStatus());
            transStatusDto.setFirstLoggingLineNr(0);
            transStatusDto.setLastLoggingLineNr(lastLineNr);
            transStatusDto.setLogDate(trans.getLogDate());
            for ( int i = 0; i < trans.nrSteps(); i++ ) {
                StepInterface baseStep = trans.getRunThread( i );
                if ( ( baseStep.isRunning() ) || baseStep.getStatus() != BaseStepData.StepExecutionStatus.STATUS_EMPTY ) {
                    StepStatusDto stepStatus = new StepStatusDto( baseStep );
                    transStatusDto.getStepStatusList().add( stepStatus );
                }
            }

            transStatusDto.setLoggingString(logData);
            transStatusDto.setPaused( trans.isPaused() );
            TransResultDto resultDto = new TransResultDto();
            resultDto.setStepPreviewDataDto(dataList);
            resultDto.setTransStatus(transStatusDto);
            return resultDto;
        } catch (Exception e1) {
            e1.printStackTrace();
            return null;
        }
    }


    /**
     * 启动转换，返回转换日志和预览结果的调试数据
     * 只获取最后一个节点预览数据
     * @param kettleTransContent
     * @return
     */
    public TransResultDto startTransWithDebugAndLastNode(String kettleTransContent){
        try {
            String createObjectId = UUID.randomUUID().toString();
            KettleEnvironment.init();
            Document doc = XMLHandler.loadXMLString( kettleTransContent, false, false );
            Node transNode = XMLHandler.getSubNode( doc, TransMeta.XML_TAG );
            TransMeta transMeta = new TransMeta(transNode,null);

            transMeta.setName(createObjectId);
            transMeta.setCarteObjectId(createObjectId);

            Trans trans = new Trans(transMeta);
            trans.setLogLevel(LogLevel.BASIC);
            trans.prepareExecution(null);

            // 添加节点行数据监听器，用于获取节点输出数据
            Map<String, List<RowMetaAndData>> previewDataMap = new HashMap<>(2);
            boolean isLast = false;
            Map<String, TransHopNodeWrap> lineLink = new HashMap<>(2);
            for (TransHopMeta transHop : trans.getTransMeta().getTransHops()) {
                if (lineLink.isEmpty()) {
                    TransHopNodeWrap wrap = new TransHopNodeWrap(transHop.getFromStep(),
                            new HashSet<>(), new HashSet<>());
                    wrap.getToNames().add(transHop.getToStep().getName());
                    lineLink.put(transHop.getFromStep().getName(), wrap);

                    TransHopNodeWrap wrapTo = new TransHopNodeWrap(transHop.getToStep(),
                            new HashSet<>(), new HashSet<>());
                    wrapTo.getFromNames().add(transHop.getFromStep().getName());
                    lineLink.put(transHop.getToStep().getName(), wrapTo);
                } else {
                    // from节点
                    TransHopNodeWrap wrapItem = lineLink.get(transHop.getFromStep().getName());
                    if (wrapItem == null) {
                        wrapItem = new TransHopNodeWrap(transHop.getFromStep(),
                                new HashSet<>(), new HashSet<>());
                        lineLink.put(transHop.getFromStep().getName(), wrapItem);
                    }
                    wrapItem.getToNames().add(transHop.getToStep().getName());

                    // to 节点
                    TransHopNodeWrap wrapItemTo = lineLink.get(transHop.getToStep().getName());
                    if (wrapItemTo == null) {
                        wrapItemTo = new TransHopNodeWrap(transHop.getToStep(),
                                new HashSet<>(), new HashSet<>());
                        lineLink.put(transHop.getToStep().getName(), wrapItemTo);
                    }
                    wrapItemTo.getFromNames().add(transHop.getFromStep().getName());
                }
            }

            String lastNodeName = null;
            for (Map.Entry<String, TransHopNodeWrap> stringTransHopNodeWrapEntry : lineLink.entrySet()) {
                if (stringTransHopNodeWrapEntry.getValue().getToNames().isEmpty()) {
                    lastNodeName = stringTransHopNodeWrapEntry.getValue().getNode().getName();
                    break;
                }
            }
            if (lastNodeName == null) {
                return null;
            }

            StepMetaDataCombi step = null;
            for (StepMetaDataCombi stepMetaDataCombi : trans.getSteps()) {
                if (lastNodeName.equals(stepMetaDataCombi.stepname)) {
                    step = stepMetaDataCombi;
                    break;
                }
            }
            List<RowMetaAndData> rowsData = new ArrayList<>();
            previewDataMap.put(step.stepname, rowsData);
            step.step.addRowListener( new RowAdapter() {
                @Override
                public void rowWrittenEvent(RowMetaInterface rowMeta, Object[] row ) throws KettleStepException {
                    if ( rowsData.size() < previewDataRows ) {
                        try {
                            rowsData.add( new RowMetaAndData( rowMeta, rowMeta.cloneRow( row ) ) );
                        } catch ( Exception e ) {
                            throw new KettleStepException( "Unable to clone row for metadata : " + rowMeta, e );
                        }
                    }
                }
            } );

            // 启动转换，等待执行完成
            trans.startThreads();
            trans.waitUntilFinished();

            List<StepPreviewDataDto> dataList = new ArrayList<>();
            // 处理结果和日志数据
            if (previewDataMap.size() > 0) {
                for (Map.Entry<String, List<RowMetaAndData>> stringListEntry : previewDataMap.entrySet()) {
                    StepPreviewDataDto stepPreviewDataDto = new StepPreviewDataDto();
                    List<OutputField> columns = new ArrayList<>();
                    for (ValueMetaInterface valueMetaInterface : stringListEntry.getValue().get(0).getRowMeta().getValueMetaList()) {
                        OutputField outputField = new OutputField();
                        outputField.setName(valueMetaInterface.getName());
                        outputField.setType(valueMetaInterface.getTypeDesc());
                        columns.add(outputField);
                    }
                    Object[] data = stringListEntry.getValue().stream().map(RowMetaAndData::getData).toArray();
                    stepPreviewDataDto.setDataArray(data);
                    stepPreviewDataDto.setFields(columns);
                    stepPreviewDataDto.setStepName(stringListEntry.getKey());
                    dataList.add(stepPreviewDataDto);
                }
            }

            // 转换任务执行日志和各个节点状态处理保存到日志文件
            int lastLineNr = KettleLogStore.getLastBufferLineNr();
            String logData = KettleLogStore.getAppender().getBuffer(
                    trans.getLogChannel().getLogChannelId(), false, 0, lastLineNr ).toString();

            TransStatusDto transStatusDto = new TransStatusDto(trans.getName(), trans.getContainerObjectId(), trans.getStatus());
            transStatusDto.setFirstLoggingLineNr(0);
            transStatusDto.setLastLoggingLineNr(lastLineNr);
            transStatusDto.setLogDate(trans.getLogDate());
            for ( int i = 0; i < trans.nrSteps(); i++ ) {
                StepInterface baseStep = trans.getRunThread( i );
                if ( ( baseStep.isRunning() ) || baseStep.getStatus() != BaseStepData.StepExecutionStatus.STATUS_EMPTY ) {
                    StepStatusDto stepStatus = new StepStatusDto( baseStep );
                    transStatusDto.getStepStatusList().add( stepStatus );
                }
            }

            transStatusDto.setLoggingString(logData);
            transStatusDto.setPaused( trans.isPaused() );
            TransResultDto resultDto = new TransResultDto();
            resultDto.setStepPreviewDataDto(dataList);
            resultDto.setTransStatus(transStatusDto);
            return resultDto;
        } catch (Exception e1) {
            e1.printStackTrace();
            return null;
        }
    }

    public int getPreviewDataRows() {
        return previewDataRows;
    }

    public void setPreviewDataRows(int previewDataRows) {
        this.previewDataRows = previewDataRows;
    }
}
